/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.db.metadata.schemaregion.rocksdb;

import com.google.common.primitives.Bytes;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.rocksdb.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.*;

public class RSchemaReadWriteHandler {

    private static final Logger logger = LoggerFactory.getLogger(RSchemaReadWriteHandler.class);

    protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

    private static final String ROCKSDB_FOLDER = "rocksdb-schema";

    private static final String[] INNER_TABLES =
            new String[]{new String(RocksDB.DEFAULT_COLUMN_FAMILY), TABLE_NAME_TAGS};

    public static final String ROCKSDB_PATH = config.getSystemDir() + File.separator + ROCKSDB_FOLDER;

    private RocksDB rocksDB;

    private RSchemaConfLoader rSchemaConfLoader;

    ConcurrentMap<String, ColumnFamilyHandle> columnFamilyHandleMap = new ConcurrentHashMap<>();
    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();

    static {
        RocksDB.loadLibrary();
    }

    public RSchemaReadWriteHandler(String path, RSchemaConfLoader schemaConfLoader)
            throws RocksDBException {
        this.rSchemaConfLoader = schemaConfLoader;
        initReadWriteHandler(path);
    }

    public RSchemaReadWriteHandler() throws RocksDBException {
        initReadWriteHandler(ROCKSDB_PATH);
    }

    private void initReadWriteHandler(String path) throws RocksDBException {
        try (Options options = new Options()) {
            org.rocksdb.Logger rocksDBLogger = new RSchemaLogger(options, logger);
            rocksDBLogger.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);

            options
                    .setCreateIfMissing(true)
                    .setAllowMmapReads(true)
                    .setWriteBufferSize(rSchemaConfLoader.getWriteBufferSize())
                    .setMaxWriteBufferNumber(rSchemaConfLoader.getMaxWriteBufferNumber())
                    .setMaxBackgroundJobs(rSchemaConfLoader.getMaxBackgroundJobs())
                    .setStatistics(new Statistics())
                    .setLogger(rocksDBLogger);

            final Filter bloomFilter = new BloomFilter(rSchemaConfLoader.getBloomFilterPolicy());

            final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
            Cache cache = new LRUCache(rSchemaConfLoader.getBlockCache(), 6);
            tableOptions
                    .setBlockCache(cache)
                    .setFilterPolicy(bloomFilter)
                    .setBlockSizeDeviation(rSchemaConfLoader.getBlockSizeDeviation())
                    .setBlockSize(rSchemaConfLoader.getBlockSize())
                    .setBlockRestartInterval(rSchemaConfLoader.getBlockRestartInterval())
                    .setCacheIndexAndFilterBlocks(true)
                    .setBlockCacheCompressed(new LRUCache(rSchemaConfLoader.getBlockCacheCompressed(), 6));

            options.setTableFormatConfig(tableOptions);

            try (DBOptions dbOptions = new DBOptions(options)) {

                initColumnFamilyDescriptors(options, path);

                rocksDB = RocksDB.open(dbOptions, path, columnFamilyDescriptors, columnFamilyHandles);

                initInnerColumnFamilies();

                initRootKey();
            }
        }
    }

    private void initColumnFamilyDescriptors(Options options, String path) throws RocksDBException {
        List<byte[]> cfs = RocksDB.listColumnFamilies(options, path);
        if (cfs.isEmpty()) {
            cfs = new ArrayList<>();
            cfs.add(RocksDB.DEFAULT_COLUMN_FAMILY);
        }

        for (byte[] tableBytes : cfs) {
            columnFamilyDescriptors.add(
                    new ColumnFamilyDescriptor(tableBytes, new ColumnFamilyOptions()));
        }
    }

    private void initInnerColumnFamilies() throws RocksDBException {
        for (String tableNames : INNER_TABLES) {
            boolean tableCreated = false;
            for (ColumnFamilyHandle cfh : columnFamilyHandles) {
                if (tableNames.equals(new String(cfh.getName()))) {
                    tableCreated = true;
                    break;
                }
            }
            if (!tableCreated) {
                createTable(tableNames);
            }
        }
        for (ColumnFamilyHandle handle : columnFamilyHandles) {
            columnFamilyHandleMap.put(new String(handle.getName()), handle);
        }
    }

    private void initRootKey() throws RocksDBException {
        byte[] rootKey = RSchemaUtils.toRocksDBKey(ROOT, NODE_TYPE_ROOT);
        if (!keyExist(rootKey)) {
            rocksDB.put(rootKey, new byte[]{DATA_VERSION, DEFAULT_FLAG});
        }
    }

    private void createTable(String tableName) throws RocksDBException {
        ColumnFamilyHandle columnFamilyHandle =
                rocksDB.createColumnFamily(
                        new ColumnFamilyDescriptor(tableName.getBytes(), new ColumnFamilyOptions()));
        columnFamilyDescriptors.add(
                new ColumnFamilyDescriptor(tableName.getBytes(), new ColumnFamilyOptions()));
        columnFamilyHandles.add(columnFamilyHandle);
    }

    public ColumnFamilyHandle getColumnFamilyHandleByName(String columnFamilyName) {
        return columnFamilyHandleMap.get(columnFamilyName);
    }

    public void updateNode(byte[] key, byte[] value) throws RocksDBException {
        rocksDB.put(key, value);
    }

    public void createNode(String levelKey, RMNodeType type, byte[] value) throws RocksDBException {
        byte[] nodeKey = RSchemaUtils.toRocksDBKey(levelKey, type.getValue());
        rocksDB.put(nodeKey, value);
    }

    public void createNode(byte[] nodeKey, byte[] value) throws RocksDBException {
        rocksDB.put(nodeKey, value);
    }

    public void convertToEntityNode(String levelPath, byte[] value) throws RocksDBException {
        try (WriteBatch batch = new WriteBatch()) {
            byte[] internalKey = RSchemaUtils.toInternalNodeKey(levelPath);
            byte[] entityKey = RSchemaUtils.toEntityNodeKey(levelPath);
            batch.delete(internalKey);
            batch.put(entityKey, value);
            executeBatch(batch);
        }
    }

    public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
        String[] nodes = fullPath.getNodes();
        String key = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
        try {
            byte[] value = rocksDB.get(key.getBytes());
            if (value == null) {
                logger.warn("path not exist: {}", key);
                throw new MetadataException("key not exist");
            }
            return new MeasurementMNode(null, fullPath.getFullPath(), null, null);
        } catch (RocksDBException e) {
            throw new MetadataException(e);
        }
    }

    public boolean keyExistByType(String levelKey, RMNodeType type) throws RocksDBException {
        return keyExistByType(levelKey, type, new Holder<>());
    }

    public boolean keyExistByType(String levelKey, RMNodeType type, Holder<byte[]> holder)
            throws RocksDBException {
        byte[] key = RSchemaUtils.toRocksDBKey(levelKey, type.getValue());
        return keyExist(key, holder);
    }

    public CheckKeyResult keyExistByAllTypes(String levelKey) throws RocksDBException {
        RMNodeType[] types =
                new RMNodeType[]{
                        RMNodeType.ALISA,
                        RMNodeType.ENTITY,
                        RMNodeType.INTERNAL,
                        RMNodeType.MEASUREMENT,
                        RMNodeType.STORAGE_GROUP
                };
        return keyExistByTypes(levelKey, types);
    }

    public CheckKeyResult keyExistByTypes(String levelKey, RMNodeType... types)
            throws RocksDBException {
        CheckKeyResult result = new CheckKeyResult();
        try {
            Arrays.stream(types)
                    .forEach(
                            x -> {
                                byte[] key = Bytes.concat(new byte[]{(byte) x.getValue()}, levelKey.getBytes());
                                try {
                                    Holder<byte[]> holder = new Holder<>();
                                    boolean keyExisted = keyExist(key, holder);
                                    if (keyExisted) {
                                        result.setExistType(x.getValue());
                                        result.setValue(holder.getValue());
                                    }
                                } catch (RocksDBException e) {
                                    throw new RuntimeException(e);
                                }
                            });
        } catch (Exception e) {
            if (e.getCause() instanceof RocksDBException) {
                throw (RocksDBException) e.getCause();
            }
            throw e;
        }
        return result;
    }

    public boolean keyExist(byte[] key, Holder<byte[]> holder) throws RocksDBException {
        boolean exist = false;
        if (rocksDB.keyMayExist(key, holder)) {
            if (holder.getValue() == null) {
                byte[] value = rocksDB.get(key);
                if (value != null) {
                    exist = true;
                    holder.setValue(value);
                }
            } else {
                exist = true;
            }
        }
        return exist;
    }

    public boolean keyExist(byte[] key) throws RocksDBException {
        return keyExist(key, new Holder<>());
    }

    public void scanAllKeysRecursively(Set<String> seeds, int level, Function<String, Boolean> op) {
        if (seeds == null || seeds.isEmpty()) {
            return;
        }
        Set<String> children = ConcurrentHashMap.newKeySet();
        seeds
                .parallelStream()
                .forEach(
                        x -> {
                            if (Boolean.TRUE.equals(op.apply(x))) {
                                // x is not leaf node
                                String childrenPrefix = RSchemaUtils.getNextLevelOfPath(x, level);
                                children.addAll(getAllByPrefix(childrenPrefix));
                            }
                        });
        if (!children.isEmpty()) {
            scanAllKeysRecursively(children, level + 1, op);
        }
    }

    public Set<String> getAllByPrefix(String prefix) {
        Set<String> result = new HashSet<>();
        byte[] prefixKey = prefix.getBytes();
        try (RocksIterator iterator = rocksDB.newIterator()) {
            for (iterator.seek(prefixKey); iterator.isValid(); iterator.next()) {
                String key = new String(iterator.key());
                if (!key.startsWith(prefix)) {
                    break;
                }
                result.add(key);
            }
            return result;
        }
    }

    public byte[] get(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
        if (columnFamilyHandle == null) {
            return rocksDB.get(key);
        }
        return rocksDB.get(columnFamilyHandle, key);
    }

    public RocksIterator iterator(ColumnFamilyHandle columnFamilyHandle) {
        if (columnFamilyHandle == null) {
            return rocksDB.newIterator();
        }
        return rocksDB.newIterator(columnFamilyHandle);
    }

    public boolean existAnySiblings(String siblingPrefix) {
        for (char type : ALL_NODE_TYPE_ARRAY) {
            byte[] key = RSchemaUtils.toRocksDBKey(siblingPrefix, type);
            try (RocksIterator iterator = rocksDB.newIterator()) {
                for (iterator.seek(key); iterator.isValid(); iterator.next()) {
                    if (!RSchemaUtils.prefixMatch(iterator.key(), key)) {
                        break;
                    } else {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    public void getKeyByPrefix(String innerName, Function<String, Boolean> function) {
        try (RocksIterator iterator = rocksDB.newIterator()) {
            for (iterator.seek(innerName.getBytes()); iterator.isValid(); iterator.next()) {
                String keyStr = new String(iterator.key());
                if (!keyStr.startsWith(innerName)) {
                    break;
                }
                function.apply(keyStr);
            }
        }
    }

    public Map<byte[], byte[]> getKeyValueByPrefix(String innerName) {
        try (RocksIterator iterator = rocksDB.newIterator()) {
            Map<byte[], byte[]> result = new HashMap<>();
            for (iterator.seek(innerName.getBytes()); iterator.isValid(); iterator.next()) {
                String keyStr = new String(iterator.key());
                if (!keyStr.startsWith(innerName)) {
                    break;
                }
                result.put(iterator.key(), iterator.value());
            }
            return result;
        }
    }

    public String findBelongToSpecifiedNodeType(String[] nodes, char nodeType) {
        String innerPathName;
        for (int level = nodes.length; level > 0; level--) {
            String[] copy = Arrays.copyOf(nodes, level);
            innerPathName = RSchemaUtils.convertPartialPathToInnerByNodes(copy, level, nodeType);
            boolean isBelongToType = rocksDB.keyMayExist(innerPathName.getBytes(), new Holder<>());
            if (isBelongToType) {
                return innerPathName;
            }
        }
        return null;
    }

    public void executeBatch(WriteBatch batch) throws RocksDBException {
        rocksDB.write(new WriteOptions(), batch);
    }

    public void deleteNode(String[] nodes, RMNodeType type) throws RocksDBException {
        byte[] key =
                RSchemaUtils.toRocksDBKey(
                        RSchemaUtils.getLevelPath(nodes, nodes.length - 1), type.getValue());
        rocksDB.delete(key);
    }

    public void deleteByKey(byte[] key) throws RocksDBException {
        rocksDB.delete(key);
    }

    public void deleteNodeByPrefix(byte[] startKey, byte[] endKey) throws RocksDBException {
        rocksDB.deleteRange(startKey, endKey);
    }

    public void deleteNodeByPrefix(ColumnFamilyHandle handle, byte[] startKey, byte[] endKey)
            throws RocksDBException {
        rocksDB.deleteRange(handle, startKey, endKey);
    }

    @TestOnly
    public void scanAllKeys(String filePath) throws IOException {
        try (RocksIterator iterator = rocksDB.newIterator()) {
            logger.info("\n-----------------scan rocksdb start----------------------");
            iterator.seekToFirst();
            File outputFile = new File(filePath);
            if (outputFile.exists()) {
                boolean deleted = outputFile.delete();
                logger.info("delete output file: " + deleted);
            }
            try (BufferedOutputStream outputStream =
                         new BufferedOutputStream(new FileOutputStream(outputFile))) {
                while (iterator.isValid()) {
                    byte[] key = iterator.key();
                    key[0] = (byte) (key[0] + '0');
                    outputStream.write(key);
                    outputStream.write(" -> ".getBytes());

                    byte[] value = iterator.value();
                    ByteBuffer byteBuffer = ByteBuffer.wrap(value);
                    // skip the version flag and node type flag
                    ReadWriteIOUtils.readBytes(byteBuffer, 2);
                    while (byteBuffer.hasRemaining()) {
                        byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
                        switch (blockType) {
                            case RSchemaConstants.DATA_BLOCK_TYPE_TTL:
                                long l = ReadWriteIOUtils.readLong(byteBuffer);
                                outputStream.write(String.valueOf(l).getBytes());
                                outputStream.write(" ".getBytes());
                                break;
                            case DATA_BLOCK_TYPE_SCHEMA:
                                MeasurementSchema schema = MeasurementSchema.deserializeFrom(byteBuffer);
                                outputStream.write(schema.toString().getBytes());
                                outputStream.write(" ".getBytes());
                                break;
                            case RSchemaConstants.DATA_BLOCK_TYPE_ALIAS:
                                String str = ReadWriteIOUtils.readString(byteBuffer);
                                outputStream.write(Objects.requireNonNull(str).getBytes());
                                outputStream.write(" ".getBytes());
                                break;
                            case DATA_BLOCK_TYPE_ORIGIN_KEY:
                                byte[] originKey = RSchemaUtils.readOriginKey(byteBuffer);
                                outputStream.write(originKey);
                                outputStream.write(" ".getBytes());
                                break;
                            case RSchemaConstants.DATA_BLOCK_TYPE_TAGS:
                            case RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES:
                                Map<String, String> map = ReadWriteIOUtils.readMap(byteBuffer);
                                for (Map.Entry<String, String> entry : Objects.requireNonNull(map).entrySet()) {
                                    outputStream.write(
                                            ("<" + entry.getKey() + "," + entry.getValue() + ">").getBytes());
                                }
                                outputStream.write(" ".getBytes());
                                break;
                            default:
                                break;
                        }
                    }
                    outputStream.write("\n".getBytes());
                    iterator.next();
                }
            }
            logger.info("\n-----------------scan rocksdb end----------------------");
        }
    }

    public void close() throws RocksDBException {
        rocksDB.syncWal();
        rocksDB.closeE();
    }
}
